#!/usr/bin/perl -w
###############################################################################
#  $Id$
#******************************************************************************
#  Copyright (C) 2007-2009  Lawrence Livermore National Security, LLC.
#  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
#  Written by Adam Moody <moody20@llnl.gov> and
#             Mark Grondona <mgrondona@llnl.gov>
#
#  UCRL-CODE-235340.
#
#  This file is part of sqlog.
#
#  This is free software; you can redistribute it and/or modify it
#  under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
#  for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, see <http://www.gnu.org/licenses/>.
###############################################################################
#
#  sqlog-db-util - SQLOG job database maintenance.
#
###############################################################################
use strict;
use lib qw();	# Required for _perl_libpaths RPM option
use DBI;
use Digest::SHA1 qw/ sha1_hex /;
use Getopt::Long qw/ :config gnu_getopt ignore_case /;
use File::Basename;
use Hostlist;
use Time::HiRes qw( gettimeofday );

# This file contains the SQL statements needed
# to set up a 'slurm_job_log' table in a 'slurm' DB
# on a MySQL server.
#
# It can also be used to backfill the database by
# inserting records from a list of slurm job completion
# logfiles.
#
# Adam Moody <moody20@llnl.gov>

# Required for _path_env_var RPM option
$ENV{PATH} = '/bin:/usr/bin:/usr/sbin';

my %conf = ();

##############################
#  Usage:
#############################
my $progname = basename $0;

$conf{usage} = <<EOF
Usage: $progname [OPTIONS]... [FILES]...

Create SLURM job completion log database along with user accounts to
access it, and/or backfill the database from SLURM job completion
logfiles.

    -h, --help         Display this message.
    -i, --info         Print information about current DB.
    -v, --verbose      Be verbose.
    -d, --drop=V       Drop tables for version V={1,2} of database schema.
    -c, --create       Create slurm database, users, and latest version
                        of database tables.
    -b, --backfill     Backfill database from all SLURM joblog files in ARGV.
    -x, --convert      Convert data from database schema version 1 to version 2.

    -B, --backup=RANGE Copy data from tables over RANGE to a file in a format 
                       readable by the --backfill option. RANGE can be 
                       specified as "all" or a date range in the form
                       DATE..DATE. DATE must be in a format of
                       'yyyy-mm-dd hh:mm:ss'.
                       
    -o, --obfuscate    Obfuscates usernames, userids, and jobnames during
                       backup operations, which is useful for sharing system
                       joblogs with offsite collaborators.

    -p, --prune=DATE   Prune database of all jobs with start times older 
                       than DATE; write such records to a file.  DATE must 
                       be in format of 'yyyy-mm-dd hh:mm:ss'

    -C, --cores-per-node=N       
                       During --backfill, --convert, --backup, or --prune, 
                       specify the number of cores per node used to compute 
                       corecount field on clusters that allocate whole
                       nodes to jobs.

    --notrack          Disable per-job node tracking for jobs inserted during 
                       --convert or --backfill operations.

    --delay-index      Temporarily disable node tracking indicies for jobs 
                       inserted during convert of backfill operations.

    --recalc-nodecnt   When backfilling, do not use NodeCnt as stored in the
                       joblog file. Instead recalculate based on  nodelist.

    -L, --localhost    Connect to DB over localhost instead of configured 
                       SQL host.

EOF
;

sub usage { print STDERR $conf{usage}; exit 0; }

#############################
#  Read Config File.
#############################

# Config Defaults
$conf{confdir}     = "/etc/slurm";
$conf{db}          = "slurm";
$conf{sqlhost}     = "sqlhost";
$conf{ro}{sqluser} = "slurm_read";
$conf{ro}{sqlpass} = "";

$conf{rw}{sqluser} = "slurm";
$conf{rw}{sqlpass} = "";
$conf{rw}{sqlnetwork} = "192.168.%.%";

# enables / disables node tracking per job in version 2 schema
$conf{track} = 1;

read_config ();


##############################
#  Parse Command-line
#############################

# set defaults and read in command-line options
$conf{verbose}   = 0;
$conf{info}      = 0;
$conf{drop}      = 0;
$conf{create}    = 0;
$conf{backfill}  = "";
$conf{convert}   = 0;      # convert data in version 1 table to version 2
$conf{backup}    = 0;
$conf{obfuscate} = 0;
$conf{prune}     = undef;
# used to set corecount field during convert or backfill
# for machines which allocate whole nodes
$conf{cores}     = undef;
# if set to 0, remove node-tracking indicies, insert nodes,
# and renable indicies
$conf{indicies}  = 1;
$conf{localhost} = 0;

GetOptions (
   "help|h"         => \$conf{help},
   "verbose|v+"     => \$conf{verbose},
   "info|i"         => \$conf{info},
   "drop|d=i"       => \$conf{drop},
   "create|c"       => \$conf{create},
   "backfill|b"     => \$conf{backfill},
   "convert|x"      => \$conf{convert},
   "backup|B=s"     => \$conf{backup},
   "obfuscate|o"    => \$conf{obfuscate},
   "prune|p=s"      => \$conf{prune},
   "cores-per-node|C=i"  => \$conf{cores},
   "notrack"        => sub { $conf{track} = 0; },
   "delay-index"    => sub { $conf{indicies} = 0; },
   "localhost|L"    => \$conf{localhost},
   "recalc-nodecnt" => \$conf{recalculate_nodecount},
) or usage ();

if (!$conf{create} && !$conf{convert} && !$conf{drop} && 
    !$conf{backfill} && !$conf{backup} && !$conf{prune} && 
    !$conf{info} && !$conf{help}) {
    log_error ("Specify at least one of " . 
               "--{create,convert,drop,backfill,backup,prune,info}.\n");
    usage ();
}

if ($conf{help}) {
    usage ();
}

#############################
# Attempt to connect to slurm database
#############################

# test whether slurm db already exists by trying to connect
my $dbh = connect_db_rw ();

# backup table data -- writes records to a file readable by backfill
# global variables to help obfuscate user and jobnames
my %obfuscate = ();
my $num_users = 0;
my $num_jobs  = 0;

if ($conf{backup} or defined $conf{prune}) { 
    # check that we have a db connection
    if (!$dbh) {
        log_fatal ("Data dump requested, but connection to database failed!\n")
    }

    # check that user gave us exactly one file name
    if (@ARGV != 1) {
        log_fatal ("You must specify a date range and" .
                   " a filename to append data to.\n");
    }
    my $joblog = shift @ARGV;

    # dump data to joblog file
    if (table_exists ($dbh, "slurm_job_log")) {
        dump_slurm_joblog_table (1, $dbh, $joblog); 
    }
    if (table_exists ($dbh, "jobs")) {
        dump_slurm_joblog_table (2, $dbh, $joblog); 
    }
}

#
#  Drop existing tables
#
if ($conf{drop}) {
    if ($dbh) {
        if ($conf{drop} == 1) {
            log_verbose ("drop: Dropping version 1 tables\n");
            drop_slurm_joblog_table_v1 ($dbh);
        } elsif ($conf{drop} == 2) {
            log_verbose ("drop: Dropping version 2 tables\n");
            drop_slurm_joblog_table_v2 ($dbh);
        } else {
            log_verbose ("drop: Unknown schema version: $conf{drop}\n");
        }
        $dbh = disconnect_db_rw ();
    } else {
        log_verbose ("drop: No existing slurm DB to drop\n");
    }
    # TODO: should we also delete the slurm db and users 
    #  (i.e., undo everything create does?)
}

#
#  Create database
#
if ($conf{create} && $dbh) {
    # if version 2 tables do not exist, create them
    if (not table_exists ($dbh, "jobs")) {
        log_verbose ("create: Creating version 2 tables.\n");
        create_slurm_joblog_table_v2 ($dbh);
    } else {
        log_verbose ("create: SLURM database already exists.\n");
    }
} elsif ($conf{create} && !$dbh) {
    # the db may not exist (couldn't connect), try to create it
    create_db_and_slurm_users ();

    # try to connect again
    $dbh = connect_db_rw() 
        or log_fatal ("create: Failed to connect to SLURM DB after create!\n");

    # create version 2 from the beginning on a brand new install
    log_verbose ("create: Creating version 2 tables.\n");

    create_slurm_joblog_table_v2 ($dbh);
}

# 
# Convert slurm_job_log table to version 2 
#  (add corecount and extend nodelist columns)
#
if ($conf{convert}) {
    #
    #  Attempt to convert table to version 2, if conversion fails 
    #   print an error. 
    #
    #  If the table has already been converted, a message is printed 
    #   and no action is taken
    #
    if (!$dbh) {
        log_fatal ("convert: Conversion requested," .
                   " but connection to database failed.\n")
    }
    log_verbose ("convert: Initiating conversion from" .
                 " version 1 to version 2 tables.\n");
    if (!convert_slurm_joblog_table_from_v1_to_v2 ($dbh)) {
        log_fatal ("convert: SLURM job log table conversion failed.\n");
    }
}

#
#  Backfill from logfiles
#
if ($conf{backfill}) { 
    if (!$dbh) {
        log_fatal ("backfill: Backfill requested," .
                   " but connection to database failed!\n")
    }
    # if we find the version 2 schema, backfill to it
    # otherwise, if we find the version 1 schema, backfill to it
    # if we find neither, throw an error
    if (table_exists ($dbh, "jobs")) {
        backfill_slurm_joblog_table_to_v2 ($dbh, @ARGV); 
    } elsif (table_exists ($dbh, "slurm_job_log")) {
        backfill_slurm_joblog_table_to_v1 ($dbh, @ARGV); 
    } else {
        log_fatal ("backfill: Unknown schema version.\n");
    }
}

if ($conf{info}) {
    show_info ();
}

disconnect_db_rw ();

exit 0;

#############################
# Support functions
#############################

sub db_host_string
{
    return $conf{localhost} ? "localhost" : $conf{sqlhost};
}

sub connect_db_rw
{
    my $host = db_host_string ();
    my $cstr = "DBI:mysql(PrintError=>0):" .
               "database=$conf{db};host=$host:";

    my $dbh = DBI->connect($cstr, $conf{rw}{sqluser}, $conf{rw}{sqlpass})
       or log_verbose ("Unable to connect to MySQL DB as ",
                     "$conf{rw}{sqluser}\@$conf{sqlhost}: ", $DBI::errstr, "\n");

    $conf{dbh}{rw} = $dbh;

    return ($dbh);
}

sub disconnect_db_rw
{
    return if !$conf{dbh}{rw};
    $conf{dbh}{rw}->disconnect;
    return $conf{dbh}{rw} = undef;
}

sub connect_db_root
{
    my $host = db_host_string ();
    my $str  = "DBI:mysql(PrintError=>0):host=$host;";

    $conf{dbh}{root} = DBI->connect ($str, "root", $conf{rw}{rootpass}) 
        or log_fatal ("Unable to connect to MySQL DB as root\@$host: ",
                      $DBI::errstr, "\n");

    return ($conf{dbh}{root});
}

# returns 1 if table exists, 0 otherwise
sub table_exists
{
    my $dbh   = shift @_;
    my $table = shift @_;

    # check whether our database has a table by the proper name
    my $sth = $dbh->prepare("SHOW TABLES;");
    if ($sth->execute()) {
        while (my ($name) = $sth->fetchrow_array()) {
            if ($name eq $table) { return 1; }
        }
    }

    # didn't find it
    return 0;
}

sub read_config 
{
    my $ro = "$conf{confdir}/sqlog.conf";
    my $rw = "$conf{confdir}/slurm-joblog.conf";

    # First read sqlog config to get SQLHOST and SQLDB
    #  (ignore SQLUSER/SQLPASS)
    unless (my $rc = do $ro) {
        log_fatal ("Couldn't parse $ro: $@\n") if $@;
        log_fatal ("couldn't run $ro\n") if (defined $rc && !$rc);
    }

    $conf{db}          = $conf::SQLDB   if (defined $conf::SQLDB);
    $conf{sqlhost}     = $conf::SQLHOST if (defined $conf::SQLHOST);
    $conf{ro}{sqluser} = $conf::SQLUSER if (defined $conf::SQLUSER);
    $conf{ro}{sqlpass} = $conf::SQLPASS if (defined $conf::SQLPASS);

    # enable / disable per job node tracking
    $conf{track} = $conf::TRACKNODES if (defined $conf::TRACKNODES);

    undef $conf::SQLUSER;
    undef $conf::SQLPASS;

    # Now read slurm-joblog.conf
    -r $rw  || log_fatal ("Unable to read required config file: $rw.\n");
    unless (my $rc = do $rw) {
        log_fatal ("Couldn't parse $rw: $@\n") if $@;
        log_fatal ("couldn't run $rw\n") if (defined $rc && !$rc);
    }

    $conf{rw}{sqluser}    = $conf::SQLUSER     if (defined $conf::SQLUSER);
    $conf{rw}{sqlpass}    = $conf::SQLPASS     if (defined $conf::SQLPASS);
    $conf{rw}{rootpass}   = $conf::SQLROOTPASS if (defined $conf::SQLROOTPASS);
    $conf{rw}{sqlnetwork} = $conf::SQLNETWORK  if (defined $conf::SQLNETWORK);

    @{$conf{rw}{hosts}} = @conf::SQLRWHOSTS if (defined @conf::SQLRWHOSTS);

    my %seen;
    @{$conf{rw}{hosts}} = grep {$_ && !$seen{$_}++} @{$conf{rw}{hosts}};

}

# Connect to MySQL as root user to build slurm db
# and insert slurm and slurm_read users
sub create_db_and_slurm_users
{
    my $dbh = connect_db_root () 
        or log_fatal ("Couldn't connect to database as root\n");

    #  
    #  Abort if slurm_job_log table already exists.
    if (table_exists ($dbh, "slurm_job_log") or table_exists ($dbh, "jobs")) {
        log_msg ("create: SLURM job log table exists. No create necessary.\n");
        return;
    }

    #############################
    # Create slurm db / table
    #############################

    log_verbose ("Creating slurm DB\n");
    do_sql ($dbh, "CREATE DATABASE IF NOT EXISTS $conf{db};"); 

    #############################
    # Set up slurm (r/w) and slurm_read (r/o) access 
    #############################

    # Switch to management databases
    do_sql($dbh, "USE mysql;");

    log_verbose ("Dropping previous slurm joblog db users and privileges.\n");
    drop_slurm_users ($dbh);

    # set up permissions for different users of slurm database
    for my $host (@{$conf{rw}{hosts}}, "localhost") {
        my $user = $conf{rw}{sqluser};
        log_verbose ("Granting rw privileges to $user on $host\n");
        do_sql ($dbh, 
                "GRANT ALL ON $conf{db}.* TO" .
                " '$user'\@'$host'" .
                " IDENTIFIED BY '$conf{rw}{sqlpass}'"); 
    }

    log_verbose ("Granting readonly privs to $conf{ro}{sqluser} " .
                 "on $conf{rw}{sqlnetwork}.\n");
    do_sql ($dbh, 
            "GRANT SELECT ON $conf{db}.* TO" .
            " $conf{ro}{sqluser}\@'$conf{rw}{sqlnetwork}'" .
            " IDENTIFIED BY ''");

    # flush privileges to make our changes current
    log_verbose ("FLUSH PRIVILEGES\n");
    do_sql($dbh, "FLUSH PRIVILEGES;");

    # we're done
    log_verbose ("Done creating slurm joblog DB.\n");
}

sub show_info 
{
    my $dbh = connect_db_rw () or return;

    # determine what schema version we're at
    my $version = "UKNOWN";
    if (table_exists ($dbh, "jobs")) {
      $version = 2;
    } elsif (table_exists ($dbh, "slurm_job_log")) {
      $version = 1;
    }

    &log_verbose ("Connected to joblog database version $version\n");

    # count the number of jobs in version 1
    my $count_v1 = 0;
    my $stmt = "SELECT COUNT(*) FROM `$conf{db}`.`slurm_job_log`;";
    my $sth = $dbh->prepare ($stmt) or return;
    if ($sth->execute ()) { ($count_v1) = $sth->fetchrow_array; }

    # count the number of jobs in version 2
    my $count_v2 = 0;
    $stmt = "SELECT COUNT(*) FROM `$conf{db}`.`jobs`;";
    $sth = $dbh->prepare ($stmt) or return;
    if ($sth->execute ()) { ($count_v2) = $sth->fetchrow_array; }

    # add the job counts to get the total
    my $count = $count_v1 + $count_v2;

    # now we're ready to print
    log_msg ("Information for SLURM job log DB:\n");
    print "DB Host:   $conf{sqlhost}\n";
    print "DB User:   $conf{ro}{sqluser}\n";
    print "RW User:   $conf{rw}{sqluser}\n";
    print "SLURM DB:  $conf{db}\n";
    print "Version:   $version\n";
    print "Job count: $count\n";
    
    return;
}

sub drop_slurm_users
{
    my $dbh = shift @_;
    my $stmt = "SELECT user,host from mysql.user;";
    my @oldusers = ();

    my $sth = $dbh->prepare ($stmt) or return;
    $sth->execute () or return;

    while ((my $a = $sth->fetchrow_arrayref)) {
        if ($a->[0] ne "$conf{ro}{sqluser}"  &&
            $a->[0] ne "$conf{rw}{sqluser}" ) {
                next;
        }
        push (@oldusers, "$a->[0]\@'$a->[1]'");
    }
    do_sql ($dbh, "DROP USER " . join (", ", @oldusers)) if @oldusers;
}

# execute (do) sql statement on dbh
sub do_sql {
    my ($dbh, $stmt) = @_;
    log_debug ("SQL: [$stmt]\n");
    $dbh->do ($stmt);
    if (not $dbh->do ($stmt)) {
        log_error ("FAILED SQL: $stmt ERROR: " . $dbh->errstr . "\n");
        return 0;
    }
    return 1;
}

####################
# Schema version 1 functions
####################

# drop the table
sub drop_slurm_joblog_table_v1
{
    my $dbh = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # now drop the tables
    log_verbose ("drop: Dropping existing 'slurm_job_log' table\n");
    my $sql = "DROP TABLE `slurm_job_log`;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    return $success;
}

# build the table
sub create_slurm_joblog_table_v1
{
    my $dbh = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; } 

    # keep this schema around for historical record
    # (could enable one to build a v1 table if so desired)
    my $sql = "CREATE TABLE IF NOT EXISTS slurm_job_log (
        id        int(10)   NOT NULL AUTO_INCREMENT,
        jobid     int(10)   NOT NULL,
        username  char(100) NOT NULL,
        userid    int(10)   NOT NULL,
        jobname   char(100) NOT NULL,
        jobstate  char(25)  NOT NULL,
        partition char(25)  NOT NULL,
        timelimit int(10)   NOT NULL,
        starttime datetime  NOT NULL,
        endtime   datetime  NOT NULL,
        nodelist  varchar(1024) NOT NULL,
        nodecount int(10)   NOT NULL,
        PRIMARY KEY (id),
        UNIQUE INDEX jobid (jobid,starttime),
        INDEX username (username)
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    return $success;
}

# given hash of values, create mysql values string for insert statement
sub value_string_v1
{
    my $dbh = shift @_;
    my $h   = shift @_;

    my @parts = ();
    push @parts, "NULL";
    push @parts, $dbh->quote($h->{JobId});
    push @parts, $dbh->quote($h->{UserName});
    push @parts, $dbh->quote($h->{UserNumb});
    push @parts, $dbh->quote($h->{Name});
    push @parts, $dbh->quote($h->{JobState});
    push @parts, $dbh->quote($h->{Partition});
    push @parts, $dbh->quote($h->{TimeLimit});
    push @parts, $dbh->quote($h->{StartTime});
    push @parts, $dbh->quote($h->{EndTime});
    push @parts, $dbh->quote($h->{NodeList});
    push @parts, $dbh->quote($h->{NodeCnt});

    return "(" . join(',', @parts) . ")";
}

# do a batch insert to be more efficient
sub insert_values_v1
{
    my $dbh = shift @_;
    my @values = @_;

    while (@values) {
        my @subvalues = ();
        for (my $i = 0; $i < 50 and @values; $i++) { 
            push @subvalues, shift @values; 
        }
        my $sql = "INSERT IGNORE INTO `$conf{db}`.`slurm_job_log` VALUES " . 
            join(",", @subvalues) . ";";

        #log_debug ("SQL: $sql\n");
        $dbh->do($sql);
    }
}

# given a dbh and list of slurm job completion logfiles,
# insert them into the dbh
sub backfill_slurm_joblog_table_to_v1
{
    my $dbh = shift @_;
    my @files = @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # if our new table does not exist, create it
    if (not table_exists ($dbh, "slurm_job_log")) {
        if (not create_slurm_joblog_table_v1($dbh)) {
            return 0;
        }
    }

    log_error ("No files to backfill!\n") if (!@files);

    foreach my $file (@files) {
        my @values = ();
        my $count = 0;
        my $skipped = 0;

        my $f = $file;
        $f = "gzip -dc $f | " if ($f =~ /\.gz$/);

        open (IN, $f) or log_error ("Failed to open \"$file\":$!\n"), next;

        while (my $line = <IN>) {
            chomp $line;
            my @parts = split(" ", $line);

            my %h = ();
            foreach my $part (@parts) {
                my ($key, $value) = split("=", $part);
                $h{$key} = $value;
            }

            # Some very old joblog files may have the incorrect
            #  datetime format. Unfortunately, the year wasn't
            #  included in these, so we have to drop these entries :-(
            if (defined $h{StartTime} and $h{StartTime} =~ m{^\d\d/\d\d-}) {
                $skipped++;
                next;
            }

            # convert from slurm log to format for MySQL
            if (defined $h{"UserId"}) {
                my $userid = $h{"UserId"};
                my ($username, $usernumb) = ($userid =~ /(.+)\((\d+)\)/);
                if (defined $username and defined $usernumb) {
                    $h{"UserName"} = $username;
                    $h{"UserNumb"} = $usernumb;
                }
            }
            if (defined $h{"StartTime"}) {
                $h{"StartTime"} =~ s/T/ /;
            }
            if (defined $h{"EndTime"}) {
                $h{"EndTime"}   =~ s/T/ /;
            }

            push @values, value_string_v1($dbh, \%h);
            
            if (@values > 100) {
                insert_values_v1($dbh, @values);
                @values = ();
            }
            $count++;
        }
        insert_values_v1($dbh, @values);

        log_verbose ("Backfilled $count jobs from file $file\n");
        log_error ("Skipped $skipped job(s) from file $file because of ",
                  "old date format\n") if $skipped;

        close(IN);
    }

    return $success;
}

####################
# Schema version 2 functions
####################

# cache for name ids, saves us from hitting the database
# over and over at the cost of more memory
my %IDcache = ();
%{$IDcache{nodes}} = ();

# return the auto increment value for the last inserted record
sub get_last_insert_id
{
    my $dbh = shift @_;
    my $id = undef;

    my $sql = "SELECT LAST_INSERT_ID();";
    my $sth = $dbh->prepare($sql);
    if ($sth->execute()) {
        ($id) = $sth->fetchrow_array();
    } else {
        log_error ("Fetching last id: $sql\n");
    }

    return $id;
}

# given a table and name, read id for name from table
# and add to id cache if found
sub read_id
{
    my $dbh   = shift @_;
    my $table = shift @_;
    my $name  = shift @_;

    my $id = undef;

    # if name is not set, don't try to look it up in hash, just return undef
    if (not defined $name) { return $id; }

    if (not defined $IDcache{$table}) { %{$IDcache{$table}} = (); }
    if (not defined $IDcache{$table}{$name}) {
        my $q_name = $dbh->quote($name);
        my $sql = "SELECT * FROM `$table` WHERE `name` = $q_name;";
        my $sth = $dbh->prepare($sql);
        if ($sth->execute ()) {
            my ($table_id, $table_name) = $sth->fetchrow_array ();
            if (defined $table_id and defined $table_name) {
                $IDcache{$table}{$name} = $table_id;
                $id = $table_id;
            }
        } else {
            log_error ("Reading record: $sql --> " . $dbh->errstr . "\n");
        }
    } else {
        $id = $IDcache{$table}{$name};
    }

    return $id;
}

# insert name into table if it does not exist, and return its id
sub read_write_id
{
    my $dbh   = shift @_;
    my $table = shift @_;
    my $name  = shift @_;

    # if name isn't set, set it to the empty string
    # DON'T do this in slurm-joblog, it will fail and
    # write to the joblog instead
    if (not defined $name) { $name = ""; }

    # attempt to read the id first, if not found,
    # insert it and return the last insert id
    my $id = read_id($dbh, $table, $name);
    if (not defined $id) {
        my $q_name = $dbh->quote($name);
        my $sql = "INSERT IGNORE INTO `$table` (`id`,`name`)" .
                  " VALUES (NULL,$q_name);";
        my $sth = $dbh->prepare($sql);
        if ($sth->execute ()) {
            # user read_id here instead of get_last_insert_id
            # to avoid race conditions
            $id = read_id ($dbh, $table, $name);
            if (not defined $id) {
                log_error ("Error inserting new record (id undefined): $sql\n");
                $id = 0;
            } elsif ($id == 0) {
                log_error ("Error inserting new record (id=0): $sql\n");
                $id = 0;
            }
        } else {
            log_error ("Error inserting new record: $sql --> " .
                       $dbh->errstr . "\n");
            $id = 0;
        }
    }

    return $id;
}

# given a reference to a list of nodes,
# read their ids from the nodes table and add them to the id cache
sub read_node_ids
{
    my $dbh       = shift @_;
    my $nodes_ref = shift @_;
    my $success = 1;

    # build list of nodes not in our cache
    my @missing_nodes = ();
    foreach my $node (@$nodes_ref) {
        if (not defined $IDcache{nodes}{$node}) { push @missing_nodes, $node; }
    }

    # if any missing nodes, try to look up their values
    if (@missing_nodes > 0) {
        my @q_nodes = map $dbh->quote($_), @missing_nodes;
        my $in_nodes = join(",", @q_nodes);
        my $sql = "SELECT * FROM `nodes` WHERE `name` IN ($in_nodes);";
        my $sth = $dbh->prepare($sql);
        if ($sth->execute ()) {
            while (my ($table_id, $table_name) = $sth->fetchrow_array ()) {
                $IDcache{nodes}{$table_name} = $table_id;
            }
        } else {
            log_error ("Reading nodes: $sql --> " . $dbh->errstr . "\n");
            $success = 0;
        }
    }

    return $success;
}

# given a reference to a list of nodes,
# insert them into the nodes table and add their ids to the id cache
sub read_write_node_ids
{
    my $dbh       = shift @_;
    my $nodes_ref = shift @_;
    my $success = 1;

    # read node_ids for these nodes into our cache
    read_node_ids($dbh, $nodes_ref);

    # if still missing nodes, we need to insert them
    my @missing_nodes = ();
    foreach my $node (@$nodes_ref) {
        if (not defined $IDcache{nodes}{$node}) { push @missing_nodes, $node; }
    }
    if (@missing_nodes > 0) {
        my @q_nodes = map $dbh->quote($_), @missing_nodes;
        my $values = join("),(", @q_nodes);
        my $sql = "INSERT IGNORE INTO `nodes` (`name`) VALUES ($values);";
        my $sth = $dbh->prepare($sql);
        if (not $sth->execute ()) {
            log_error ("Inserting nodes: $sql --> " . $dbh->errstr . "\n");
            $success = 0;
        }

        # fetch ids for just inserted nodes
        read_node_ids($dbh, $nodes_ref);
    }

    return $success;
}

# given a job_id and a nodelist,
# insert jobs_nodes records for each node used in job_id
sub insert_job_nodes
{
    my $dbh      = shift @_;
    my $job_id   = shift @_;
    my $nodelist = shift @_;
    my $success = 1;

    if (defined $job_id and defined $nodelist and $nodelist ne "") {
        my $q_job_id = $dbh->quote($job_id);

        # clean up potentially bad nodelist
        if ($nodelist =~ /\[/ and $nodelist !~ /\]/) {
            # found an opening bracket, but no closing bracket,
            # nodelist is probably incomplete
            # chop back to last ',' or '-' and replace with a ']'
            $nodelist =~ s/[,-]\d+$/\]/;
        }

        # get our nodeset
        my @nodes = Hostlist::expand($nodelist);

        # this will fill our node_id cache
        read_write_node_ids($dbh, \@nodes);

        # get the node_id for each node
        my @values = ();
        foreach my $node (@nodes) {
            if (defined $IDcache{nodes}{$node}) {
                my $q_node_id = $dbh->quote($IDcache{nodes}{$node});
                push @values, "($q_job_id,$q_node_id)";
            }
        }

        # if we have any nodes for this job, insert them
        if (@values > 0) {
            my $sql = "INSERT DELAYED IGNORE INTO `jobs_nodes`" .
                      " (`job_id`,`node_id`)" .
                      " VALUES " . join(",", @values) . ";";
            my $sth = $dbh->prepare($sql);
            if (not $sth->execute ()) {
                log_error ("Inserting jobs_nodes records for job id" .
                           " $job_id: $sql --> " . $dbh->errstr . "\n");
                $success = 0;
            }
        }
    }

    return $success;
}

# compute time since epoch, attempt to account for DST changes via timelocal
sub get_seconds
{
    my ($date) = @_;
    use Time::Local;

    my ($y, $m, $d, $H, $M, $S) = ($date =~ /(\d\d\d\d)\-(\d\d)\-(\d\d) (\d\d):(\d\d):(\d\d)/);
    $y -= 1900;
    $m -= 1;

    return timelocal ($S, $M, $H, $d, $m, $y);
}

# given hash of values, create mysql values string for insert statement
sub value_string_v2
{
    my $dbh = shift @_;
    my $h   = shift @_;

    # given start and end times, compute the number of seconds
    # the job ran for
    # TODO: unsure whether this correctly handles jobs that
    # straddle DST changes
    my $seconds = 0;
    if (defined $h->{StartTime} and $h->{StartTime} !~ /^\s*$/ and
        defined $h->{EndTime}   and $h->{EndTime}   !~ /^\s*$/)
    {
         my $start = get_seconds($h->{StartTime});
         my $end   = get_seconds($h->{EndTime});
         $seconds = $end - $start;
         if ($seconds < 0) { $seconds = 0; }
    }

    # if Procs is not set, but cores is specified and NodeCnt is set,
    # compute Procs
    # (assumes all processors on the node were allocated to the job,
    # only use for clusters which use whole-node allocation)
    if (not defined $h->{Procs} and defined $conf{cores} and
        defined $h->{NodeCnt}
       )
    {
      $h->{Procs} = $h->{NodeCnt} * $conf{cores};
    }

    # get id values
    my $username_id  = read_write_id($dbh, "usernames",  $h->{UserName});
    my $jobname_id   = read_write_id($dbh, "jobnames",   $h->{Name});
    my $jobstate_id  = read_write_id($dbh, "jobstates",  $h->{JobState});
    my $partition_id = read_write_id($dbh, "partitions", $h->{Partition});
    if (not defined $username_id or
        not defined $jobname_id or
        not defined $jobstate_id or
        not defined $partition_id)
    {
        log_error ("Missing an id for one of: jobid=$h->{JobId}," .
                   " username=$h->{UserName}, jobname=$h->{Name}," .
                   " jobstate=$h->{JobState}, partition=$h->{Partition}\n");
        log_error ("Missing an id for one of: $username_id -- $jobname_id" .
                   " -- $jobstate_id -- $partition_id\n");
    }

    # insert the field values, order matters
    my @parts = ();
    push @parts, (defined $h->{Id}) ? $dbh->quote($h->{Id}) : "NULL";
    push @parts, $dbh->quote($h->{JobId});
    push @parts, $dbh->quote($username_id);
    push @parts, $dbh->quote($h->{UserNumb});
    push @parts, $dbh->quote($jobname_id);
    push @parts, $dbh->quote($jobstate_id);
    push @parts, $dbh->quote($partition_id);
    push @parts, $dbh->quote($h->{TimeLimit});
    push @parts, $dbh->quote($h->{StartTime});
    push @parts, $dbh->quote($h->{EndTime});
    push @parts, $dbh->quote($seconds);
    push @parts, $dbh->quote($h->{NodeList});
    push @parts, $dbh->quote($h->{NodeCnt});
    push @parts, (defined $h->{Procs}) ? $dbh->quote($h->{Procs}) : 0;

    # finally, return the ('field1','field2',...) string
    return "(" . join(',', @parts) . ")";
}

# drop all v2 tables
sub drop_slurm_joblog_table_v2
{
    my $dbh = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # now drop the tables
    if (not do_sql($dbh, "DROP TABLE `jobs`;"))       { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `usernames`;"))  { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `jobnames`;"))   { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `jobstates`;"))  { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `partitions`;")) { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `nodes`;"))      { $success = 0; }
    if (not do_sql($dbh, "DROP TABLE `jobs_nodes`;")) { $success = 0; }

    return $success;
}

# build all v2 tables
sub create_slurm_joblog_table_v2
{
    my $dbh = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # nodelist can be null since some jobs are canceled before
    # ever being assigned resources
    my $sql = "CREATE TABLE IF NOT EXISTS `jobs` (
        `id`           INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `jobid`        INT          NOT NULL,
        `username_id`  INT UNSIGNED NOT NULL,
        `userid`       INT          NOT NULL,
        `jobname_id`   INT UNSIGNED NOT NULL,
        `jobstate_id`  INT UNSIGNED NOT NULL,
        `partition_id` INT UNSIGNED NOT NULL,
        `timelimit`    INT          NOT NULL,
        `starttime`    DATETIME     NOT NULL,
        `endtime`      DATETIME     NOT NULL,
        `runtime`      INT UNSIGNED NOT NULL,
        `nodelist`     BLOB         NOT NULL,
        `nodecount`    INT UNSIGNED NOT NULL,
        `corecount`    INT UNSIGNED NOT NULL,
        UNIQUE INDEX `jobid` (`jobid`,`starttime`),
        INDEX `username_id`  (`username_id`),
        INDEX `jobname_id`   (`jobname_id`),
        INDEX `starttime`    (`starttime`),
        INDEX `endtime`      (`endtime`),
        INDEX `runtime`      (`runtime`),
        INDEX `nodecount`    (`nodecount`),
        INDEX `corecount`    (`corecount`)
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # NOTE: The UNIQUE INDEX below ensures that two jobs with
    # the same name (etc.) that complete around the same time
    # do not insert two records.  The downside is that the
    # index prefix length cannot be more than 1000 bytes, so
    # names must be limited by the prefix size.

    # maps username strings to unique ids
    $sql = "CREATE TABLE IF NOT EXISTS `usernames` (
        `id`   INT UNSIGNED  NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `name` VARCHAR(512) NOT NULL,
        UNIQUE INDEX `name` (`name`(512))
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # maps partition name strings to unique ids
    $sql = "CREATE TABLE IF NOT EXISTS `partitions` (
        `id`   INT UNSIGNED  NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `name` VARCHAR(512) NOT NULL,
        UNIQUE INDEX `name` (`name`(512))
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # maps job state strings to unique ids
    $sql = "CREATE TABLE IF NOT EXISTS `jobstates` (
        `id`   INT UNSIGNED  NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `name` VARCHAR(512) NOT NULL,
        UNIQUE INDEX `name` (`name`(512))
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # maps job name strings to unique ids
    $sql = "CREATE TABLE IF NOT EXISTS `jobnames` (
        `id`   INT UNSIGNED  NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `name` VARCHAR(512) NOT NULL,
        UNIQUE INDEX `name` (`name`(512))
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # maps node name strings to unique ids
    $sql = "CREATE TABLE IF NOT EXISTS `nodes` (
        `id`   INT UNSIGNED  NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `name` VARCHAR(512) NOT NULL,
        UNIQUE INDEX `name` (`name`(512))
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    # insert a <jobid,nodeid> record for each node a job uses
    $sql = "CREATE TABLE IF NOT EXISTS `jobs_nodes` (
        `job_id`  INT UNSIGNED NOT NULL,
        `node_id` INT UNSIGNED NOT NULL,
        UNIQUE INDEX `job_node` (`job_id`,`node_id`),
        INDEX `node_id` (`node_id`)
    ) TYPE=MyISAM;";
    if (not do_sql ($dbh, $sql)) { $success = 0; }

    return $success;
}

# convert all data in version 1 table to version 2 schema
sub convert_slurm_joblog_table_from_v1_to_v2
{
    my $dbh = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # check that there is an older table to convert from
    if (not table_exists ($dbh, "slurm_job_log")) {
        log_msg ("convert: 'slurm_job_log' table does not exist.\n");
        return 0;
    }

    # if our new table does not exist, create it
    if (not table_exists ($dbh, "jobs")) {
        log_msg ("convert: 'jobs' table does not exist," .
                 " attempting to create it.\n");
        if (not create_slurm_joblog_table_v2($dbh)) {
            return 0;
        }
    }

    # if tracking and --noindicies was specified,
    # remove indicies before insert, we'll add them back later
    if ($conf{track} and not $conf{indicies}) {
        my $drop = "ALTER TABLE `jobs_nodes`" .
                   " DROP INDEX `job_node`, DROP INDEX `node_id`;";
        if (not do_sql ($dbh, $drop)) {
            log_error ("Problem dropping node tracking indicies.\n");
        }
    }

    # get the total count of jobs in the database
    # (used to print percentage of progress)
    my $total_count = 0;
    my $sth_count = $dbh->prepare("SELECT COUNT(*) FROM `slurm_job_log`;");
    if ($sth_count->execute()) {
        ($total_count) = $sth_count->fetchrow_array();
    }
    my $milemarker = int($total_count / 100);
    if ($milemarker == 0) { $milemarker = 1; }

    # now grab all of the jobs and insert them one-by-one
    my $sth_all_jobs = $dbh->prepare("SELECT * FROM `slurm_job_log`;");
    my $job_id = undef;
    if ($sth_all_jobs->execute()) {
        my $count = 0;
        my $time_sum = 0;
        while (my @parts = $sth_all_jobs->fetchrow_array()) {
            # start timer
            my ($start_secs, $start_micros) = gettimeofday();

            my %h = ();
            # throw id away, we'll get a new one any way,
            # and this way we can run the conversion on a live machine,
            # since slurm-joblog.pl will be inserting records as this
            # conversion is running
            #$h{Id}        = $parts[0];
            $h{JobId}     = $parts[1];
            $h{UserName}  = $parts[2];
            $h{UserNumb}  = $parts[3];
            $h{Name}      = $parts[4];
            $h{JobState}  = $parts[5];
            $h{Partition} = $parts[6];
            $h{TimeLimit} = $parts[7];
            $h{StartTime} = $parts[8];
            $h{EndTime}   = $parts[9];
            $h{NodeList}  = $parts[10];
            $h{NodeCnt}   = $parts[11];
            # Procs field wasn't defined in version 1 schema
            #$h{Procs}     = $parts[12];

            # bug in version 1, which set nodecount to 1 for blank hostlists
            my $hostlist = $h{NodeList};
            if ($hostlist =~ /^\s*$/) {
                $h{NodeList} = "";
                $h{NodeCnt}  = 0;
            }

            # build the values string
            my $values = value_string_v2($dbh, \%h);

            # insert the job
            if ($conf{track}) {
                # insert the job, need to wait on the insert
                # since we need the job_id
                my $sql = "INSERT IGNORE INTO `$conf{db}`.`jobs`" .
                          " VALUES $values;";
                if (not do_sql($dbh, $sql)) {
                    $success = 0;
                } else {
                    # now insert nodes used by this job
                    my $job_id = get_last_insert_id ($dbh);
                    if (defined $job_id and $job_id != 0) {
                        insert_job_nodes ($dbh, $job_id, $h{NodeList});
                    }
                }
            } else {
                # insert the job, no need to wait on it
                my $sql = "INSERT DELAYED IGNORE INTO `$conf{db}`.`jobs`" .
                          " VALUES $values;";
                if (not do_sql($dbh, $sql)) { $success = 0; }
            }

            # stop timer and print timing and progress as we go
            my ($end_secs, $end_micros) = gettimeofday();
            my $micros = ($end_secs * 1000000 + $end_micros) -
                ($start_secs * 1000000 + $start_micros);
            $time_sum += $micros;
            $count++;
            if ($count % $milemarker == 0) {
                my $avg_time = int($time_sum / $count);
                my $perc = "";
                if ($total_count > 0) {
                    $perc = sprintf("%.0f", $count / $total_count * 100);
                }
                log_msg ("Records converted $count ($perc%):" .
                         " $avg_time usec / record\n");
                $time_sum = 0;
            }
        }
    } else {
        # select against version 1 table failed
        $success = 0;
    }

    # rebuild indicies
    if ($conf{track} and not $conf{indicies}) {
        my $rebuild = "ALTER TABLE `jobs_nodes`" .
                      " ADD UNIQUE INDEX `job_node` (`job_id`,`node_id`)," .
                      " ADD INDEX `node_id` (`node_id`);";
        if (not do_sql ($dbh, $rebuild)) {
            log_error ("Problem rebuilding node tracking indicies.\n");
        }
    }

    return $success;
}

# backfill data from files into version 2 tables
sub backfill_slurm_joblog_table_to_v2
{
    my $dbh = shift @_;
    my @files = @_;
    my $success = 1;

    # switch to the slurm db
    do_sql ($dbh, "USE $conf{db};");

    # if our new table does not exist, create it
    if (not table_exists ($dbh, "jobs")) {
        create_slurm_joblog_table_v2($dbh);
    }

    log_error ("No files to backfill!\n") if (!@files);

    # if tracking and --noindicies was specified,
    # remove indicies before insert, we'll add them back later
    if ($conf{track} and not $conf{indicies}) {
        my $drop = "ALTER TABLE `jobs_nodes`" .
                   " DROP INDEX `job_node`," .
                   " DROP INDEX `node_id`;";
        if (not do_sql ($dbh, $drop)) {
            log_error ("Problem dropping node tracking indicies.\n");
        }
    }

    my $count = 0;
    my $time_sum = 0;
    foreach my $file (@files) {
        my $skipped = 0;

        my $f = $file;
        $f = "gzip -dc $f | " if ($f =~ /\.gz$/);

        open (IN, $f) or log_error ("Failed to open \"$file\":$!\n"), next;

        while (my $line = <IN>) {
            # start timer
            my ($start_secs, $start_micros) = gettimeofday();

            chomp $line;
            my @parts = split(" ", $line);

            my %h = ();
            foreach my $part (@parts) {
                my ($key, $value) = split("=", $part);
                $h{$key} = $value;
            }

            # Some very old joblog files may have the incorrect
            #  datetime format. Unfortunately, the year wasn't
            #  included in these, so we have to drop these entries :-(
            if (defined $h{StartTime} and $h{StartTime} =~ m{^\d\d/\d\d-}) {
                $skipped++;
                next;
            }

            if ($conf{recalculate_nodecount} && defined $h{NodeList}) {
                my $hostlist = $h{NodeList};
                if ($hostlist =~ /^\s*$/) {
                    $h{NodeCnt}  = 0;
                }
                else {
                    $h{NodeCnt} = Hostlist::expand($hostlist)
                }
            }

            # convert from slurm log to format for MySQL
            if (defined $h{"UserId"}) {
                my $userid = $h{"UserId"};
                my ($username, $usernumb) = ($userid =~ /(.+)\((\d+)\)/);
                if (defined $username and defined $usernumb) {
                    $h{"UserName"} = $username;
                    $h{"UserNumb"} = $usernumb;
                }
            }
            if (defined $h{"StartTime"}) {
                $h{"StartTime"} =~ s/T/ /;
            }
            if (defined $h{"EndTime"}) {
                $h{"EndTime"}   =~ s/T/ /;
            }

            # set the values
            my $values = value_string_v2($dbh, \%h);

            # insert the job
            if ($conf{track}) {
                # insert the job, need to wait on the insert
                # since we need the job_id
                my $sql = "INSERT IGNORE INTO `$conf{db}`.`jobs`" .
                          " VALUES $values;";
                if (not do_sql($dbh, $sql)) {
                    $success = 0;
                } else {
                    # now insert nodes used by this job
                    my $job_id = get_last_insert_id ($dbh);
                    if (defined $job_id and $job_id != 0) {
                        insert_job_nodes ($dbh, $job_id, $h{NodeList});
                    }
                }
            } else {
                # insert the job, no need to wait on it
                my $sql = "INSERT DELAYED IGNORE INTO `$conf{db}`.`jobs`" .
                          " VALUES $values;";
                if (not do_sql($dbh, $sql)) { $success = 0; }
            }

            # stop timer and print timing and progress as we go
            my ($end_secs, $end_micros) = gettimeofday();
            my $micros = ($end_secs * 1000000 + $end_micros) -
                ($start_secs * 1000000 + $start_micros);
            $time_sum += $micros;
            $count++;
            if ($count % 1000 == 0) {
                my $avg_time = int($time_sum / $count);
                log_msg ("Records converted $count:" .
                         " $avg_time usec / record\n");
                $time_sum = 0;
            }
        }

        log_verbose ("Backfilled $count jobs from file $file\n");
        log_error ("Skipped $skipped job(s) from file $file because of ",
                  "old date format\n") if $skipped;

        close(IN);
    }

    # rebuild indicies
    if ($conf{track} and not $conf{indicies}) {
        my $rebuild = "ALTER TABLE `jobs_nodes`" .
                      " ADD UNIQUE INDEX `job_node` (`job_id`,`node_id`)," .
                      " ADD INDEX `node_id` (`node_id`);";
        if (not do_sql ($dbh, $rebuild)) {
            log_error ("Problem rebuilding node tracking indicies.\n");
        }
    }

    return $success;
}

####################
# Utility functions
####################

# append records to file
sub dump_slurm_joblog_table
{
    my $version = shift @_;
    my $dbh     = shift @_;
    my $joblog  = shift @_;
    my $success = 1;

    # switch to the slurm db
    if (not do_sql ($dbh, "USE $conf{db};")) { $success = 0; }

    # check that we have a table to get data from
    if ($version > 1) {
        if (not table_exists ($dbh, "jobs")) {
            log_msg ("'jobs' table does not exist.\n");
            return 0;
        }
    } else {
        if (not table_exists ($dbh, "slurm_job_log")) {
            log_msg ("'slurm_job_log' table does not exist.\n");
            return 0;
        }
    }

    # if prune is set, check that the date format is valid,
    # and check that we're not also obfuscating
    my $date = undef;
    if (defined $conf{prune}) {
        # can't prune and obfuscate at the same time
        if ($conf{obfuscate}) {
            log_fatal ("You cannot prune and obfuscate at the same time.\n");
        }

        # make sure the date is valid format
        if ($conf{prune} !~ /^\d\d\d\d\-\d\d\-\d\d \d\d:\d\d:\d\d$/) {
            log_fatal ("Invalid prune date: $conf{prune}." .
                       "  Must be in --prune='yyyy-mm-dd hh:mm:ss' format.\n");
        }

        # ok, build out date qualifier
        $date = "`starttime` < " . $dbh->quote($conf{prune});

        # TODO: if tracking, remove indicies from jobs_nodes and
        # add back after we're done?
    } elsif (defined $conf{backup}) {
        # make sure the date is valid format
        if ($conf{backup} =~ /^all$/i) {
            # nothing to do here
        } elsif ($conf{backup} =~ /^(\d\d\d\d\-\d\d\-\d\d \d\d:\d\d:\d\d)$/) {
            $date = "`starttime` < " . $dbh->quote($1);
        } elsif ($conf{backup} =~ /^(\d\d\d\d\-\d\d\-\d\d \d\d:\d\d:\d\d)\.\.(\d\d\d\d\-\d\d\-\d\d \d\d:\d\d:\d\d)$/) {
            $date = "`starttime` >= " . $dbh->quote($1) .
                    " AND `starttime` < " . $dbh->quote($2);
        } else {
            log_fatal ("Invalid backup range: $conf{backup}." .
                       "  Must be one of: \"all\", DATE, or DATE..DATE;" .
                       " where DATE is 'yyyy-mm-dd hh:mm:ss'.\n");
        }
    }

    # open our output file
    if (!open (JOBLOG, ">>$joblog")) {
        log_fatal ("Unable to open $joblog: $!\n");
    }

    my $stmt = "";

    # build a statement to get the total count of jobs in
    # the database (used to print percentage of progress)
    my $total_count = 0;
    if ($version > 1) {
        $stmt = "SELECT COUNT(*) FROM `jobs`";
    } else {
        $stmt = "SELECT COUNT(*) FROM `slurm_job_log`";
    }
    if (defined $date) { $stmt .= " WHERE $date"; }
    $stmt .= " ORDER BY `starttime`,`id` ASC;";

    # get the count
    log_debug ("$stmt\n");
    my $sth_count = $dbh->prepare($stmt);
    if ($sth_count->execute()) {
        ($total_count) = $sth_count->fetchrow_array();
    }
    my $milemarker = int($total_count / 100);
    if ($milemarker == 0) { $milemarker = 1; }

    # build a statement to select our records
    if ($version > 1) {
        $stmt = "SELECT" .
        " `jobs`.*," .
        "`usernames`.`name` as `username`," .
        "`jobnames`.`name` as `jobname`," .
        "`jobstates`.`name` as `jobstate`," .
        "`partitions`.`name` as `partition`" .
        " FROM `jobs`" .
        " LEFT JOIN `usernames`  ON `jobs`.`username_id`  = `usernames`.`id`" .
        " LEFT JOIN `jobnames`   ON `jobs`.`jobname_id`   = `jobnames`.`id`" .
        " LEFT JOIN `jobstates`  ON `jobs`.`jobstate_id`  = `jobstates`.`id`" .
        " LEFT JOIN `partitions` ON `jobs`.`partition_id` = `partitions`.`id`";
    } else {
        $stmt = "SELECT * FROM `slurm_job_log`";
    }
    if (defined $date) { $stmt .= " WHERE $date"; }
    $stmt .= " ORDER BY `starttime`,`id` ASC;";

    # now grab all of the jobs and append them one-by-one
    log_debug ("$stmt\n");
    my $sth_all_jobs = $dbh->prepare($stmt);
    if ($sth_all_jobs->execute()) {
        my $count = 0;
        my $time_sum = 0;

        while (my $h = $sth_all_jobs->fetchrow_hashref()) {
            # start timer
            my ($start_secs, $start_micros) = gettimeofday();

            # bug in version 1, which set nodecount to 1 for blank hostlists
            if ($$h{nodelist} =~ /^\s*$/) {
                $$h{nodelist} = "";
                $$h{nodecount}  = 0;
            }

            # set time to proper format
            $$h{starttime} =~ s/(\-\d\d) (\d\d:)/$1T$2/;
            $$h{endtime}   =~ s/(\-\d\d) (\d\d:)/$1T$2/;

            # set procs field
            my $procs = undef;
            if ($version > 1) {
                $procs = $$h{'corecount'};
            } elsif (defined $conf{cores}) {
                $procs = $$h{'nodecount'} * $conf{cores};
            }

            # optionally obfuscate username, userid, and jobname
            my $username = $$h{'username'};
            my $userid   = $$h{'userid'};
            my $jobname  = $$h{'jobname'};
            if ($conf{obfuscate} and not defined $conf{prune}) {
               # obfuscate username
               if (not defined $obfuscate{usernames}{$username}) {
                   $num_users++;
                   $obfuscate{usernames}{$username} = $num_users;
               }
               $username = "user_" . $obfuscate{usernames}{$username};

               # obfuscate userid
               $userid = $num_users;

               # obfuscate jobname
               if (not defined $obfuscate{jobnames}{$jobname}) {
                   $num_jobs++;
                   $obfuscate{jobnames}{$jobname} = $num_jobs;
               }
               $jobname = "job_" . $obfuscate{jobnames}{$jobname};
            }

            # append record to file
            my @params = ();
            push @params, $$h{'jobid'};
            push @params, $username;
            push @params, $userid;
            push @params, $jobname;
            push @params, $$h{'jobstate'};
            push @params, $$h{'partition'};
            push @params, $$h{'timelimit'};
            push @params, $$h{'starttime'};
            push @params, $$h{'endtime'};
            push @params, $$h{'nodelist'};
            push @params, $$h{'nodecount'};
            if (defined $procs) {
                push @params, "$procs";
                printf JOBLOG
                  "JobId=%s UserId=%s(%s) Name=%s JobState=%s Partition=%s " .
                  "TimeLimit=%s StartTime=%s EndTime=%s NodeList=%s " .
                  "NodeCnt=%s Procs=%s\n", @params;
            } else {
                printf JOBLOG
                  "JobId=%s UserId=%s(%s) Name=%s JobState=%s Partition=%s " .
                  "TimeLimit=%s StartTime=%s EndTime=%s NodeList=%s " .
                  "NodeCnt=%s\n", @params;
            }

            # if we are pruning, delete the job and any associated records
            if (defined $conf{prune}) {
                my $id = $$h{'id'};
                if (defined $id) {
                    my $q_job_id = $dbh->quote($id);

                    # if tracking, first delete all node records
                    if ($version > 1 and $conf{track}) {
                        do_sql ($dbh, "DELETE FROM `jobs_nodes`" .
                                      " WHERE `job_id` = $q_job_id;");
                    }

                    # now delete the job record
                    if ($version > 1) {
                        do_sql ($dbh, "DELETE FROM `jobs`" .
                                      " WHERE `id` = $q_job_id;");
                    } else {
                        do_sql ($dbh, "DELETE FROM `slurm_job_log`" .
                                      " WHERE `id` = $q_job_id;");
                    }
                }
            }

            # stop timer and print timing and progress as we go
            my ($end_secs, $end_micros) = gettimeofday();
            my $micros = ($end_secs * 1000000 + $end_micros) -
                ($start_secs * 1000000 + $start_micros);
            $time_sum += $micros;
            $count++;
            if ($count % $milemarker == 0) {
                my $avg_time = int($time_sum / $count);
                my $perc = "";
                if ($total_count > 0) {
                    $perc = sprintf("%.0f", $count / $total_count * 100);
                }
                log_msg ("Records written $count ($perc%):" .
                         " $avg_time usec / record\n");
                $time_sum = 0;
            }
        }

        log_msg ("Wrote $count jobs to $joblog.\n");
    } else {
        # select against version 1 table failed
        $success = 0;
    }

    close (JOBLOG);
    return $success;
}

#
#  Generate a digest of the password, sha1 or md5 depending on the
#   size of the password column in the user table
#
sub passwd_digest
{
    my $dbh = connect_db_root ();
    my $passwd = shift @_;

    log_fatal ("passwd_digest: Failed to get DB handle!\n") if !$dbh;

    my $sth = $dbh->prepare ("SELECT PASSWORD('example');") 
        or log_fatal ($dbh->errstr);

    $sth->execute ();

    my ($r) = $sth->fetchrow_array ();

    if (length $r >= 41) {
        return "*" . sha1_hex ($passwd);
    } 

    #  I don't know what the short password hash is, so 
    #   use of this function is disabled for now.
    #
    #return (unpack ("H16", pack ("A13", $c)));
}

sub log_msg     { print STDERR "$progname: ", @_;       }
sub log_error   { log_msg ("Error: ", @_);              }
sub log_fatal   { log_msg ("Fatal: ", @_); exit 1;      }
sub log_verbose { log_msg (@_) if ($conf{verbose});     }
sub log_debug   { log_msg (@_) if ($conf{verbose} > 1); }

# vi: ts=4 sw=4 expandtab
